生产者可以理解为是生产数据的,消费者可以理解为处理数据的
1. 为什么要使用生产者和消费者模式
- 从函数的方面理解:如果我要生产一个数据,然后将这个数据给函数,让函数依赖这个数据进行运算,且这一过程就是同步过程,如果数据生产的很慢,函数就会一直等待,后面的代码也无法执行
- 从进程的方面理解:在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。(通俗理解:如果数据产生的快,处理的慢,或者产生的慢,消费的快,那么就使生产者和消费者模式)
2. 什么是生产者和消费者模式
生产者和消费者彼此之间不进行直接的通信,而是通过队列来进行通信,所以生产者生产完数据之后不用等待消费者处理,直接扔给队列,消费者不找生产者要数据,而是直接从队列里取,队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
3. 做包子和吃包子例子
- 例如包子做的快而吃的慢
import time
from multiprocessing import Process
from multiprocessing import Queue
# 生产者
def producer(q):
for i in range(100):
q.put('%s.包子' % i) # 将生产的数据添加到队列中
# 消费者
def consumer(q):
for i in range(100):
time.sleep(1) # 模拟吃包子需要1秒时间
print('吃了%s' % q.get()) # 从队列中获取生产的数据,不直接先生产者获取
if __name__ == '__main__':
q = Queue(10) # 限制队列的长度,因为如果一下子生产了100个数据放进队列中,且消费者进程消费的很慢,那么没有被消费的数据就会一直在内存中等待被消费,一直占用着内存
# 生产者进程 -> 如果生产的很慢想提高生产速度可以开多几个进程进行处理
p = Process(target=producer, args=(q,))
p.start()
# 消费者进程 -> 如果消费的很慢想提高消费速度可以开多几个进程进行处理
c1 = Process(target=consumer, args=(q,))
c2 = Process(target=consumer, args=(q,))
c1.start()
c2.start()
4. 解决生产者和消费者模式中不平衡的问题
- 当我们循环去获取队列中生产者所生产的值的时候,我们不知道生产者会生产了多少个数据,这样就会造成循环无法结束 q.get() 一直在等待着获取队里中的数据,那么程序就会进入阻塞无法结束程序
# 无法结束循环,程序进入阻塞无法结束程序
import time
from multiprocessing import Process
from multiprocessing import Queue
# 生产者
def producer(q):
for i in range(50):
q.put('%s.包子' % i)
# 消费者
def consumer(q):
while True: # 一般情况下会使用 while 去获取数据,因为我们不知道生产者会生产多少个数据,但是这样也造成了循环无法结束
time.sleep(1)
print('吃了%s' % q.get()) # 循环无法借宿导致 q.get() 一直在等待获取队列中的数据
if __name__ == '__main__':
q = Queue(10)
# 生产者进程
p = Process(target=producer, args=(q,))
p.start()
# 消费者进程
c1 = Process(target=consumer, args=(q,))
c2 = Process(target=consumer, args=(q,))
c1.start()
c2.start()
- 解决方法一
- 生产完数据后再向队列中添加多一个标识数据,当消费者循环获取数据的时候就要进行判断,拿到的数据是不是等于标识数据,如果等于就说明消费者已经获取完所有生产者的数据,可以结束循环
- 存在问题: 如果有100个消费进程,生产完数据后就要往队列添加100个标识数据
import time
from multiprocessing import Process
from multiprocessing import Queue
# 生产者
def producer(q):
for i in range(50):
q.put('%s.包子' % i)
# 如果有3个消费进程,那么就要往队列添加3个标识数据
q.put(None) # 当生产完数据后再向队列中添加多一个标识数据
q.put(None) # 当生产完数据后再向队列中添加多一个标识数据
q.put(None) # 当生产完数据后再向队列中添加多一个标识数据
# 消费者
def consumer(q):
while True:
time.sleep(1)
bun = q.get()
if bun is None: # 判断拿到的数据是不是等于标识数据,如果等于就说明消费者已经获取完所有生产者的数据,可以结束循环
break
print('吃了%s' % bun)
if __name__ == '__main__':
q = Queue(10)
# 生产者进程
p = Process(target=producer, args=(q,))
p.start()
# 消费者进程
c1 = Process(target=consumer, args=(q,))
c2 = Process(target=consumer, args=(q,))
c3 = Process(target=consumer, args=(q,))
c1.start()
c2.start()
c3.start()
- 解决方法二 -> 使用 JoinableQueue 队列
- 使用 JoinableQueue 队列的执行顺序:生产者生产的数据全部被消费完 -> 生产者进程结束 -> 主进程代码执行结束 -> 消费者守护进程结束
import time
from multiprocessing import Process
from multiprocessing import JoinableQueue
# 生产者
def producer(q):
for i in range(50):
q.put('%s.包子' % i)
q.join() # 等待 消费者进程 把所有的数据处理完
# 消费者
def consumer(q):
while True:
time.sleep(1)
bun = q.get()
print('吃了%s' % bun)
q.task_done() # 告诉生产者进程我处理完了这一个数据
if __name__ == '__main__':
q = JoinableQueue()
# 生产者进程
p = Process(target=producer, args=(q,))
p.start()
# 消费者进程 -> 所有的消费者进程都要开启守护进程,当主进程的代码执行结束后,消费者进程也随之结束,里面的循环也不会被执行,这样就不用像解决办法一通过标识数据来结束循环
c1 = Process(target=consumer, args=(q,))
c1.daemon = True # 开启守护进程
c1.start()
c2 = Process(target=consumer, args=(q,))
c2.daemon = True # 开启守护进程
c2.start()
c3 = Process(target=consumer, args=(q,))
c3.daemon = True # 开启守护进程
c3.start()
# 主进程中所有的生产者进程都要执行 .join 方法,等待 xxx 生产者进程结束
p.join() # 等待 p 生产者进程结束,如果其他的生产者进程,那么也要执行 .join 方法等待 xxx 生产者进程结束